using System.Text;
using Lemon.BackgroundJobs.Abstractions.DelayQueues;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Lemon.BackgroundJobs.RabbitMQ.DelayQueues;

public class DelayQueues : IDelayQueues
{
    private const string Exchange = "delay-exchange-direct";
    private const string RoutingKey = "routing-delay";
    
    private readonly IRabbitMQClient _rabbitMqClient;
    public DelayQueues(IRabbitMQClient rabbitMqClient)
    {
        _rabbitMqClient = rabbitMqClient;
    }
    
    /// <summary>
    /// 发布消息(延迟消息)
    /// </summary>
    /// <param name="msg"></param>
    /// <param name="queue"></param>
    /// <param name="delay">延迟时间（毫秒）</param>
    /// <typeparam name="T"></typeparam>
    public void Publish<T>(T msg, string queue, int delay) where T : class, new()
    {
        string exchange = $@"{queue}-{Exchange}";
        using (var channel = _rabbitMqClient.GetConnection().CreateModel())
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-expires", delay + 160000},
                {"x-message-ttl", delay}, //队列上消息过期时间，应小于队列过期时间
                {"x-dead-letter-exchange", exchange}, //过期消息转向路由  
                {"x-dead-letter-routing-key", RoutingKey} //过期消息转向路由相匹配routing-key
            };
            channel.QueueDeclare(queue: queue,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: arguments);

            var message = Newtonsoft.Json.JsonConvert.SerializeObject(msg);
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "",
                routingKey: queue,
                false,
                basicProperties: properties,
                body: body);
        }
    }

    /// <summary>
    /// 事件订阅
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="queue"></param>
    /// <param name="handler"></param>
    public void Subscribe<T>(string queue, Action<T> handler) where T : class, new()
    {
        string exchange = $@"{queue}-{Exchange}";
        var channel = _rabbitMqClient.GetConnection().CreateModel();
        channel.ExchangeDeclare(exchange: exchange, type: ExchangeType.Direct, false, false, null);

        var queueName = channel.QueueDeclare().QueueName;
        channel.QueueBind(queue: queueName,
            exchange: exchange,
            routingKey: RoutingKey);

        channel.BasicQos(0, 20, false);
        
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            T? msg = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message);

            if (msg == null)
            {
                throw new Exception("message is null");
            }
            handler(msg);
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queueName, false, "", false, false, null, consumer);
    }
}